package com.starsky.common.hbase.service;

import com.starsky.common.hbase.entity.HBaseUser;
import com.starsky.common.hbase.entity.InsertData;
import com.starsky.common.hbase.entity.SearchData;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.hadoop.hbase.HbaseTemplate;
import org.springframework.data.hadoop.hbase.RowMapper;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Slf4j
@Service
public class HbaseService {

    @Autowired
    private HbaseTemplate hbaseTemplate;

    /**
     * 根据表名称和行名称(主键)获取一行信息
     *
     * @param tableName 表名称
     * @param rowName   主键
     * @return
     */
    public Map<String, Object> single(String tableName, String rowName) {
        Map result;
        result = hbaseTemplate.get(tableName, rowName, (RowMapper<Map>) (result1, i) -> {
            List<Cell> ceList = result1.listCells();
            Map<String, Object> map = new HashMap<>();
            if (ceList != null && ceList.size() > 0) {
                for (Cell cell : ceList) {
                    String column = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
                    String key = Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()) + "_" + column;
                    String value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
                    map.put(key, value);
                }
            }
            return map;
        });
        return result;
    }

    /**
     * 获取表中某列某个字段的信息
     *
     * @param tableName  表名
     * @param rowName    主键名
     * @param familyName 列名
     * @param qualifier  列族
     * @return
     */
    public String get(String tableName, String rowName, String familyName, String qualifier) {
        return hbaseTemplate.get(tableName, rowName, familyName, qualifier, (result, i) -> {
            List<Cell> ceList = result.listCells();
            String res = "";
            if (ceList != null && ceList.size() > 0) {
                for (Cell cell : ceList) {
                    res = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
                }
            }
            return res;
        });
    }

    /**
     * 获取表中所有数据
     *
     * @param tableName 表名
     * @return
     */
    public List<Map<String, Object>> all(String tableName) {
        Scan scan = new Scan();
        return hbaseTemplate.find(tableName, scan, (result, rowNum) -> {
            List<Cell> ceList = result.listCells();
            Map<String, Object> map = new HashMap<>();
            String row = "";
            if (ceList != null && ceList.size() > 0) {
                for (Cell cell : ceList) {
                    row = Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
                    String value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
                    String family = Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
                    String quali = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
                    Map<String, String> quVal;
                    if (map.get(family) != null) {
                        quVal = (Map<String, String>) map.get(family);
                    } else {
                        quVal = new HashMap<>();
                    }
                    quVal.put(quali, value);
                    map.put(family, quVal);
                }
                map.put("row", row);
            }
            return map;
        });
    }

    /**
     * 保存/修改数据
     * 如果rowName已经存在则为修改, 否则为新增
     *
     * @param tableName 表名
     * @param rowName   主键
     * @param data      保存的数据 键名-键族名-数据 (无键族则键族置空即可)
     * @return
     */
    public Boolean save(String tableName, String rowName, List<InsertData> data) {
        Boolean result = hbaseTemplate.execute(tableName, (hTableInterface) -> {
            boolean flag = false;
            try {
                Put put = new Put(rowName.getBytes());
                if (data != null && data.size() > 0) {
                    for (InsertData insertData : data) {
                        put.addColumn(insertData.getFamily().getBytes(), insertData.getQualifier().getBytes(), insertData.getData().getBytes());
                    }
                }
                hTableInterface.put(put);
                flag = true;
            } catch (Exception e) {
                e.printStackTrace();
            }
            return flag;
        });
        return result;
    }

    /**
     * 保存/修改用户信息
     * @param user
     * @return
     */
    public Boolean save(HBaseUser user) {
        if (user.getQualifierValues()==null || user.getQualifierValues().size()==0) {
            log.info("保存的用户行为信息为空");
            return false;
        }
        Boolean result = hbaseTemplate.execute(user.getTableName(), (hTableInterface) -> {
            boolean flag = false;
            try {
                Put put = new Put(user.getRowKey().getBytes());
                if (user.getQualifierValues() != null && user.getQualifierValues().size() > 0) {
                    for (InsertData insertData : user.getQualifierValues()) {
                        put.addColumn(insertData.getFamily().getBytes(), insertData.getQualifier().getBytes(), insertData.getData().getBytes());
                    }
                }
                hTableInterface.put(put);
                flag = true;
            } catch (Exception e) {
                e.printStackTrace();
            }
            return flag;
        });
        return result;
    }

    /**
     * 根据条件查询数据 仅查询相等数据
     *
     * @return
     */
    public List<Map<String, Map<String, String>>> findData(SearchData search) throws IOException {
        Scan scan = new Scan();
        // 开始结束行设置
        if (StringUtils.isNotEmpty(search.getStartRow()) && StringUtils.isNotEmpty(search.getEndRow())) {
            scan.withStartRow(Bytes.toBytes(search.getStartRow()));
            scan.withStopRow(Bytes.toBytes(search.getEndRow()));
        }
        // 设置时间条件
        if (search.getSearchStartTime() != null && search.getSearchEndTime() != null) {
            scan.setTimeRange(search.getSearchStartTime().getTime(), search.getSearchEndTime().getTime());
        }
        // 装入搜索条件
        if (StringUtils.isNotEmpty(search.getVal()) && StringUtils.isNotEmpty(search.getFamily()) && StringUtils.isNotEmpty(search.getQualifier())) {
            Filter filter = new SingleColumnValueFilter(Bytes.toBytes(search.getFamily()), Bytes.toBytes(search.getQualifier()), CompareFilter.CompareOp.EQUAL, Bytes.toBytes(search.getVal()));
            scan.setFilter(filter);
        }
        List<Map<String, Map<String, String>>> md = hbaseTemplate.find(search.getTableName(), scan, (result, rowNum) -> {
            Cell[] cells = result.rawCells();
            Map<String, Map<String, String>> data = new HashMap<>();
            for (Cell c : cells) {
                String row = new String(CellUtil.cloneRow(c));
                String columnFamily = new String(CellUtil.cloneFamily(c));
                String quar = new String(CellUtil.cloneQualifier(c));
                String value = new String(CellUtil.cloneValue(c));
                Map<String, String> obj = data.get(row);
                if (null == obj) {
                    obj = new HashMap<>();
                    data.put(row, obj);
                }
                obj.put(columnFamily + "_" + quar, value);
            }
            return data;
        });
        List<Map<String, Map<String, String>>> result = new ArrayList<>();
        for (Map data : md) {
            if (data != null && data.keySet() != null && data.keySet().size() > 0) {
                result.add(data);
            }
        }
        return result;
    }


}
